/* SPDX-License-Identifier: MPL-2.0 */
#include "../include/zmq.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include <stdarg.h>
#include <string.h>
#include <string>

#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#include <unistd.h>
#endif


/*
   Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.

   Topology:

     XPUB                      SUB
      |                         |
      +-----> XSUB -> XPUB -----/
      |       ^^^^^^^^^^^^
     XPUB      ZMQ proxy

   All connections use "inproc" transport. The two XPUB sockets start
   flooding the proxy. The throughput is computed using the bytes received
   in the SUB socket.
*/


#define HWM 10000

#ifndef ARRAY_SIZE
#define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
#endif

#define TEST_ASSERT_SUCCESS_ERRNO(expr)                                        \
    test_assert_success_message_errno_helper (expr, NULL, #expr)

// This macro is used to avoid-variable warning. If used with an expression,
// the sizeof is not evaluated to avoid polluting the assembly code.
#ifdef NDEBUG
#define ASSERT_EXPR_SAFE(x)                                                    \
    do {                                                                       \
        (void) sizeof (x);                                                     \
    } while (0)
#else
#define ASSERT_EXPR_SAFE(x) assert (x)
#endif


static uint64_t message_count = 0;
static size_t message_size = 0;


typedef struct
{
    void *context;
    int thread_idx;
    const char *frontend_endpoint[4];
    const char *backend_endpoint[4];
    const char *control_endpoint;
} proxy_hwm_cfg_t;


int test_assert_success_message_errno_helper (int rc_,
                                              const char *msg_,
                                              const char *expr_)
{
    if (rc_ == -1) {
        char buffer[512];
        buffer[sizeof (buffer) - 1] =
          0; //  to ensure defined behavior with VC++ <= 2013
        printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
                msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
                msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
        exit (1);
    }
    return rc_;
}

static void set_hwm (void *skt)
{
    int hwm = HWM;

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
}

static void publisher_thread_main (void *pvoid)
{
    const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
    const int idx = cfg->thread_idx;
    int optval;
    int rc;

    void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
    assert (pubsocket);

    set_hwm (pubsocket);

    optval = 1;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));

    optval = 1;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));

    //  Wait before starting TX operations till 1 subscriber has subscribed
    //  (in this test there's 1 subscriber only)
    char buffer[32] = {};
    rc = TEST_ASSERT_SUCCESS_ERRNO (
      zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
    if (rc != 1) {
        printf ("invalid response length: expected 1, received %d", rc);
        exit (1);
    }
    if (buffer[0] != 1) {
        printf ("invalid response value: expected 1, received %d",
                (int) buffer[0]);
        exit (1);
    }

    zmq_msg_t msg_orig;
    rc = zmq_msg_init_size (&msg_orig, message_size);
    assert (rc == 0);
    memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));

    uint64_t send_count = 0;
    while (send_count < message_count) {
        zmq_msg_t msg;
        zmq_msg_init (&msg);
        rc = zmq_msg_copy (&msg, &msg_orig);
        assert (rc == 0);

        //  Send the message to the socket
        rc = zmq_msg_send (&msg, pubsocket, 0);
        if (rc != -1) {
            send_count++;
        } else {
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
        }
    }

    zmq_close (pubsocket);
    //printf ("publisher thread ended\n");
}

static void subscriber_thread_main (void *pvoid)
{
    const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
    const int idx = cfg->thread_idx;

    void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
    assert (subsocket);

    set_hwm (subsocket);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_connect (subsocket, cfg->backend_endpoint[idx]));

    //  Receive message_count messages
    uint64_t rxsuccess = 0;
    bool success = true;
    while (success) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        assert (rc == 0);

        rc = zmq_msg_recv (&msg, subsocket, 0);
        if (rc != -1) {
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
            rxsuccess++;
        }

        if (rxsuccess == message_count)
            break;
    }

    //  Cleanup

    zmq_close (subsocket);
    //printf ("subscriber thread ended\n");
}

static void proxy_thread_main (void *pvoid)
{
    const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
    int rc;

    //  FRONTEND SUB

    void *frontend_xsub = zmq_socket (
      cfg->context,
      ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
    assert (frontend_xsub);

    set_hwm (frontend_xsub);

    //  Bind FRONTEND
    for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
        const char *ep = cfg->frontend_endpoint[i];
        if (ep != NULL) {
            assert (strlen (ep) > 5);
            rc = zmq_bind (frontend_xsub, ep);
            ASSERT_EXPR_SAFE (rc == 0);
        }
    }

    //  BACKEND PUB

    void *backend_xpub = zmq_socket (
      cfg->context,
      ZMQ_XPUB); //  the backend is the one exposed to the external world (TCP)
    assert (backend_xpub);

    int optval = 1;
    rc =
      zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
    ASSERT_EXPR_SAFE (rc == 0);

    set_hwm (backend_xpub);

    //  Bind BACKEND
    for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
        const char *ep = cfg->backend_endpoint[i];
        if (ep != NULL) {
            assert (strlen (ep) > 5);
            rc = zmq_bind (backend_xpub, ep);
            ASSERT_EXPR_SAFE (rc == 0);
        }
    }

    //  CONTROL REP

    void *control_rep = zmq_socket (
      cfg->context,
      ZMQ_REP); //  This one is used by the proxy to receive&reply to commands
    assert (control_rep);

    //  Bind CONTROL
    rc = zmq_bind (control_rep, cfg->control_endpoint);
    ASSERT_EXPR_SAFE (rc == 0);

    //  Start proxying!

    zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);

    zmq_close (frontend_xsub);
    zmq_close (backend_xpub);
    zmq_close (control_rep);
    //printf ("proxy thread ended\n");
}

void terminate_proxy (const proxy_hwm_cfg_t *cfg)
{
    //  CONTROL REQ

    void *control_req = zmq_socket (
      cfg->context,
      ZMQ_REQ); //  This one can be used to send command to the proxy
    assert (control_req);

    //  Connect CONTROL-REQ: a socket to which send commands
    int rc = zmq_connect (control_req, cfg->control_endpoint);
    ASSERT_EXPR_SAFE (rc == 0);

    //  Ask the proxy to exit: the subscriber has received all messages

    rc = zmq_send (control_req, "TERMINATE", 9, 0);
    ASSERT_EXPR_SAFE (rc == 9);

    zmq_close (control_req);
}

//  The main thread simply starts some publishers, a proxy,
//  and a subscriber. Finish when all packets are received.

int main (int argc, char *argv[])
{
    if (argc != 3) {
        printf ("usage: proxy_thr <message-size> <message-count>\n");
        return 1;
    }

    message_size = atoi (argv[1]);
    message_count = atoi (argv[2]);
    printf ("message size: %d [B]\n", (int) message_size);
    printf ("message count: %d\n", (int) message_count);

    void *context = zmq_ctx_new ();
    assert (context);

    int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
    ASSERT_EXPR_SAFE (rv == 0);

    //  START ALL SECONDARY THREADS

    const char *pub1 = "inproc://perf_pub1";
    const char *pub2 = "inproc://perf_pub2";
    const char *sub1 = "inproc://perf_backend";

    proxy_hwm_cfg_t cfg_global = {};
    cfg_global.context = context;
    cfg_global.frontend_endpoint[0] = pub1;
    cfg_global.frontend_endpoint[1] = pub2;
    cfg_global.backend_endpoint[0] = sub1;
    cfg_global.control_endpoint = "inproc://ctrl";

    //  Proxy
    proxy_hwm_cfg_t cfg_proxy = cfg_global;
    void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
    assert (proxy != 0);

    //  Subscriber 1
    proxy_hwm_cfg_t cfg_sub1 = cfg_global;
    cfg_sub1.thread_idx = 0;
    void *subscriber =
      zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
    assert (subscriber != 0);

    //  Start measuring
    void *watch = zmq_stopwatch_start ();

    //  Publisher 1
    proxy_hwm_cfg_t cfg_pub1 = cfg_global;
    cfg_pub1.thread_idx = 0;
    void *publisher1 =
      zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
    assert (publisher1 != 0);

    //  Publisher 2
    proxy_hwm_cfg_t cfg_pub2 = cfg_global;
    cfg_pub2.thread_idx = 1;
    void *publisher2 =
      zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
    assert (publisher2 != 0);

    //  Wait for all packets to be received
    zmq_threadclose (subscriber);

    //  Stop measuring
    unsigned long elapsed = zmq_stopwatch_stop (watch);
    if (elapsed == 0)
        elapsed = 1;

    unsigned long throughput =
      (unsigned long) ((double) message_count / (double) elapsed * 1000000);
    double megabits = (double) (throughput * message_size * 8) / 1000000;

    printf ("mean throughput: %d [msg/s]\n", (int) throughput);
    printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);

    //  Wait for the end of publishers...
    zmq_threadclose (publisher1);
    zmq_threadclose (publisher2);

    //  ... then close the proxy
    terminate_proxy (&cfg_proxy);
    zmq_threadclose (proxy);

    int rc = zmq_ctx_term (context);
    ASSERT_EXPR_SAFE (rc == 0);

    return 0;
}
